-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Add backend for topic events view based on offset range #2551
Feature: Add backend for topic events view based on offset range #2551
Conversation
@@ -72,7 +79,8 @@ public Map<Long, String> readEvents( | |||
Set<TopicPartition> topicPartitionsSet = consumer.assignment(); | |||
|
|||
Set<TopicPartition> partitionsAssignment = new HashSet<>(); | |||
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION)) { | |||
if (offsetPosition.equals(CUSTOM_OFFSET_SELECTION) | |||
|| offsetPosition.equals(RANGE_OFFSET_SELECTION)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of RANGE_OFFSET_SELECTION, we should check if start and end offsets are present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my testing if the url pattern is not respected then there is a 403, thats why I had to change ClusterApiService.java. So it is impossible that these values will not be present at all. Therefore I made changes to ensure that they are not negative.
@@ -18,6 +19,8 @@ | |||
public class TopicContentsService { | |||
|
|||
public static final String CUSTOM_OFFSET_SELECTION = "custom"; | |||
public static final String RANGE_OFFSET_SELECTION = "range"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how abt having an enum for these two custom/range ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -23,7 +23,7 @@ public class TopicContentsController { | |||
value = | |||
"/getTopicContents/{bootstrapServers}/" | |||
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/" | |||
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}", | |||
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can offset position be custom or range or id from FE ? so we don't introduce this action in url ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I work on FE I will make appropriate changes so that the FE will give offsetPosition
as the two existing ones we have (number and custom) plus it will also give range, if thats what the user selects. I preferred to keep this PR only for BE first, then I can work on FE and make another PR.
As far as changes in url, I was a little confused as to how to approach the problem. In current behavior the offsetPosition
is used in two cases
offsetPosition
can be a number. in this case we ignore theselectedNumberOfOffsets
andselectedPartitionId
value in the url. Using theoffsetPosition
number we seek to beginning of topic and pull all data for all partitions upto the givenoffsetPosition
number.offsetPosition
is the string value custom in this case we use the provided values ofselectedNumberOfOffsets
andselectedPartitionId
to only pull data from the provided partition upto the the number provided inselectedNumberOfOffsets
Therefore in my case I felt I had the following options
- Reuse the existing url and try to do the range, in this case I have to figure out how to reuse
offsetPosition
,selectedNumberOfOffsets
andselectedPartitionId
. IfoffsetPosition
is used to provide the value range then that leaves me with the other two for start and end of range, therefore the data will be for ALL partitions. - Make some changes to url to accommodate the range.
Would you prefer that we keep the existing url exactly same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about offsetPosition = { number, custom, range }
Existing situation, if it's number, or custom, nothing changes in the url.
If it's range, we need partitionId, start and end offsets.
Looks good the way you have done it.
"/getTopicContents/{bootstrapServers}/"
+ "{protocol}/{consumerGroupId}/{topicName}/{offsetPosition}/partitionId/{selectedPartitionId}/"
+ "selectedNumberOfOffsets/{selectedNumberOfOffsets}/{clusterIdentification}/"
+ "rangeOffsets/{rangeOffsetsStart}/{rangeOffsetsEnd}",
for (ConsumerRecord<String, String> record : consumerRecords) { | ||
eventMap.put(record.offset(), record.value()); | ||
if (offsetPosition.equals(RANGE_OFFSET_SELECTION) | ||
&& (record.offset() >= rangeOffsetsEnd || eventMap.size() >= RANGE_MAX_RECORDS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need range offsets start position ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@khatibtamal thanks for the pr. I have not tested yet locally, but have a couple of comments. And how should frontend changes should look like ?
@muralibasani thanks for the review. I tested this locally using postman, and created TopicContentsControllerIT.java to thoroughly test all the new and legacy cases. As far as front end, I was hoping to first merge the BE and then do the FE in another PR. My understanding of the architecture for this project is that the FE never directly communicates with cluster-api it communicates via core and the change I made in core for the file ClusterApiService.java should ensure that even if this PR is merged, there should be no impact on FE usage and existing behavior. |
Hi @khatibtamal it's correct, FE never directly communicates with cluster api. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comments. Looks good.
After the enum changes, will test locally.
@muralibasani pushed changes |
Signed-off-by: khatib tamal <[email protected]>
Signed-off-by: khatib tamal <[email protected]>
Signed-off-by: khatib tamal <[email protected]>
Signed-off-by: khatib tamal <[email protected]>
Signed-off-by: khatib tamal <[email protected]>
f8cae7d
to
44d2450
Compare
public class TopicContentsControllerIT { | ||
|
||
public static final String CUSTOM_SELECTION = "custom"; | ||
public static final String RANGE_SELECTION = "range"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse the enum here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
topics = {TopicContentsControllerIT.TEST_TOPIC_NAME}) | ||
public class TopicContentsControllerIT { | ||
|
||
public static final String CUSTOM_SELECTION = "custom"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse the enum here ?
|
||
@Test | ||
@Order(13) | ||
void getTopicContentsWhenRangeAndTotalOffsetsLargerThanMax() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful tests, I love them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
@muralibasani I had to make some changes in ClusterApiController.java in my last commit. This is because build was failing locally, after investigation I found out it was because of embeddedKafkaBroker which was being initialized in a dynamicPropertySource method, and the broker is not being destroyed after the tests end. This caused a port conflict with newly added tests in TopicContentsControllerIT, as port 9092 was not being shut. Since EmbeddedKafkaZKBroker is implemented such that the intention is for it to be used as a spring bean (it extends from EmbeddedKafkaBroker which implements DisposableBean and InitializationBean), therefore I removed code for its manual instantiation, and relied on the bean provided by EmbeddedKafka annotation. This way spring will take care of the Initiation and Disposed to ensure the context gets properly created and destroyed for the tests in the class. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Approved.
I assume you mean the changes to ClusterApiControllerIT ? |
Yes ClusterApiController.java. Sorry about the first link. Thanks for the review. Should I open another issue with the Front End requirements? Or just re open #1987 ? Thanks. |
Yes pls, now you have the full context. Thanks again. |
Linked issue
Resolves: #1987
What kind of change does this PR introduce?
What is the current behavior?
Currently it is possible to view only last few selected offsets of partitions.
What is the new behavior?
Backend infrastructure to be able to view topic events based on range.
Other information
This PR will not affect anything in terms of front end usage yet, as a front end PR needs to be created and merged after this.
Requirements (all must be checked before review)
main
branch have been pulledpnpm lint
has been run successfully